Honor catchup for historical asset events in asset-triggered Dags#68749
Conversation
3e0da08 to
7dd9d3c
Compare
When an asset-triggered Dag is added to a deployment whose assets already have event history, its first run swept in every asset event ever recorded because the event window fell back to date.min when no previous run existed. A brand-new consumer therefore reprocessed the entire backlog on first run. Bound the first run's window at the time the Dag started scheduling on its assets (the schedule reference's created_at, which is preserved across re-parsing) so a new Dag only receives events that occurred after it began consuming the asset. Closes: apache#39456
7dd9d3c to
804d738
Compare
|
I wonder if the dag’s |
If we want to allow such a feature, I wouldn't overload the existing |
This entirely depends on how the task actually reads data, doesn’t it? If it just reads the current data, the same argument can be made against scheduled catchups too. For catchup to work, you need to implement the task in a way that makes catchup possible in the first place. With scheduled dags, this means it should use the data interval or logical date. With assets, it should use the asset event date, which would be in the past. I don’t think this argument holds. |
Gotcha, it makes sense - I've probably overstated.
WDYT? I'm fine with putting the above for a discussion and lazy consensus in the dev list. |
|
I actually think we can just use catchup because
|
Bounding the first-run event window unconditionally removed any way for a newly added asset-triggered Dag to intentionally replay an asset's full history. Reusing the existing catchup flag — which already means "consider history when the Dag is newly added" — restores that choice: catchup off (the default) skips the backlog, catchup on replays it.
|
Hi maintainer, this PR was merged without a milestone set.
|
Backport successfully created: v3-3-testNote: As of Merging PRs targeted for Airflow 3.X In matter of doubt please ask in #release-management Slack channel.
|
Human Summary
closes: #39456
related: #39603
Asset-triggered Dags now honor
catchupfor historical asset events. Withcatchup=False(the default), a Dag newly added to assets that already have event history no longer replays the backlog on its first run - the event window starts when the Dag began scheduling on those assets. Withcatchup=True, the backlog is still consumed.AI Summary
Click here
Bug. In
SchedulerJobRunner._create_dag_runs_asset_triggered, the set of asset events attached to a run is bounded below by the previous asset-triggered run'srun_after, falling back todate.minwhen there is no previous run. For a Dag's very first run this means every asset event ever recorded for its assets is consumed — so a Dag newly added to assets that already have history reprocesses the entire backlog on its first run, unconditionally.Fix. Gate the backlog behind the Dag's
catchupflag (as discussed in the PR thread —catchupalready expresses "should history be considered when the Dag is newly added"). Whencatchupis off, the Dag's earliest schedule-referencecreated_atis inserted into thecoalescefallback chain beforedate.min:So with
catchup=Falsethe first run's window is floored at the moment the Dag started scheduling on its assets; withcatchup=Truethe floor staysdate.minand the full history replays, matchingcatchupsemantics for time-scheduled Dags. Subsequent runs are unchanged either way (still bounded by the previous run'srun_after).created_atis a reliable cut-off because schedule references are updated in place during parsing (dag_processing/collection.py), not deleted and recreated, so it survives re-serialization.Scope. Direct asset references only. The asset-alias path is deliberately left on
date.min: an alias consumer is expected to pick up events attached to its alias regardless of timing (covered bytest_create_dag_runs_asset_alias_with_asset_event_attached).This is the same fix idea as the stale 2.x PR #39603, re-expressed for the current SQL-native query (a scalar subquery over a single
coalesce, avoiding an implicit cartesian join) and now gated bycatchup.Validation.
test_new_asset_triggered_dag_backlog_gated_by_catchupcovering both sides:catchup=Falseignores the pre-creation backlog (fails onmain),catchup=Trueconsumes it.test_create_dag_runs_assets, which previously encoded the buggy behavior.airflow-core/newsfragments/68749.bugfix.rstdescribing the behavior change.Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.8, 1M context) following the guidelines